package org.codehaus.activemq.transport.jrms;

import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import com.sun.multicast.reliable.RMException;
import com.sun.multicast.reliable.transport.RMPacketSocket;
import com.sun.multicast.reliable.transport.SessionDoneException;
import com.sun.multicast.reliable.transport.lrmp.LRMPTransportProfile;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.URI;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.Packet;
import org.codehaus.activemq.message.WireFormat;
import org.codehaus.activemq.transport.AbstractTransportChannel;
import org.codehaus.activemq.util.IdGenerator;

public class JRMSTransportChannel extends AbstractTransportChannel
  implements Runnable
{
  private static final int SOCKET_BUFFER_SIZE = 32768;
  private static final Log log = LogFactory.getLog(JRMSTransportChannel.class);
  private WireFormat wireFormat;
  private SynchronizedBoolean closed;
  private SynchronizedBoolean started;
  private Thread thread;
  private RMPacketSocket socket;
  private IdGenerator idGenerator;
  private String channelId;
  private int port;
  private InetAddress inetAddress;
  private Object lock;

  protected JRMSTransportChannel(WireFormat wireFormat)
  {
    this.wireFormat = wireFormat;
    this.idGenerator = new IdGenerator();
    this.channelId = this.idGenerator.generateId();
    this.closed = new SynchronizedBoolean(false);
    this.started = new SynchronizedBoolean(false);
    this.lock = new Object();
  }

  public JRMSTransportChannel(WireFormat wireFormat, URI remoteLocation)
    throws JMSException
  {
    this(wireFormat);
    try {
      this.port = remoteLocation.getPort();
      this.inetAddress = InetAddress.getByName(remoteLocation.getHost());
      LRMPTransportProfile profile = new LRMPTransportProfile(this.inetAddress, this.port);
      profile.setTTL((byte)1);
      profile.setOrdered(true);
      this.socket = profile.createRMPacketSocket(3);
    }
    catch (Exception ioe) {
      ioe.printStackTrace();
      JMSException jmsEx = new JMSException("Initialization of JRMSTransportChannel failed: " + ioe);
      jmsEx.setLinkedException(ioe);
      throw jmsEx;
    }
  }

  public void stop()
  {
    if (this.closed.commit(false, true)) {
      super.stop();
      try {
        this.socket.close();
      }
      catch (Exception e) {
        log.trace(toString() + " now closed");
      }
    }
  }

  public void start()
    throws JMSException
  {
    if (this.started.commit(false, true)) {
      this.thread = new Thread(this, "Thread:" + toString());
      this.thread.setDaemon(true);
      this.thread.start();
    }
  }

  public void asyncSend(Packet packet)
    throws JMSException
  {
    try
    {
      DatagramPacket dpacket = createDatagramPacket(packet);

      this.socket.send(dpacket);
    }
    catch (RMException rme)
    {
      JMSException jmsEx = new JMSException("syncSend failed " + rme.getMessage());
      jmsEx.setLinkedException(rme);
      throw jmsEx;
    }
    catch (IOException e) {
      JMSException jmsEx = new JMSException("asyncSend failed " + e.getMessage());
      jmsEx.setLinkedException(e);
      throw jmsEx;
    }
  }

  public boolean isMulticast()
  {
    return true;
  }

  public void run()
  {
    try
    {
      while (!this.closed.get()) {
        DatagramPacket dpacket = this.socket.receive();
        Packet packet = this.wireFormat.readPacket(this.channelId, dpacket);
        if (packet != null) {
          doConsumePacket(packet);
        }
      }
      log.trace("The socket peer is now closed");

      stop();
    }
    catch (SessionDoneException e)
    {
      log.trace("Session completed", e);
      stop();
    }
    catch (RMException ste) {
      doClose(ste);
    }
    catch (IOException e) {
      doClose(e);
    }
  }

  protected DatagramPacket createDatagramPacket() {
    DatagramPacket answer = new DatagramPacket(new byte[32768], 32768);
    answer.setPort(this.port);
    answer.setAddress(this.inetAddress);
    return answer;
  }

  protected DatagramPacket createDatagramPacket(Packet packet) throws IOException, JMSException {
    DatagramPacket answer = this.wireFormat.writePacket(this.channelId, packet);
    answer.setPort(this.port);
    answer.setAddress(this.inetAddress);
    return answer;
  }

  private void doClose(Exception ex) {
    if (!this.closed.get()) {
      JMSException jmsEx = new JMSException("Error reading socket: " + ex);
      jmsEx.setLinkedException(ex);
      onAsyncException(jmsEx);
      stop();
    }
  }

  public String toString()
  {
    return "JRMSTransportChannel: " + this.socket;
  }
}